<?php

namespace App\Command;

use App\Command\Make\MakeTableRules;
use Framework\Foundation\Config;
use Framework\Foundation\Database\FDB;
use Symfony\Component\Console\Command\Command;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
use Symfony\Component\Console\Input\InputArgument;
use App\Command\Util\Parser;
use xingwenge\canal_php\CanalConnectorFactory;
use xingwenge\canal_php\CanalClient;
use xingwenge\canal_php\Fmt;
use Com\Alibaba\Otter\Canal\Protocol\RowChange;
use Com\Alibaba\Otter\Canal\Protocol\EventType;
use Com\Alibaba\Otter\Canal\Protocol\EntryType;

class Canal extends Command
{
    protected $transactionTag = '';

    // the name of the command (the part after "bin/console")
    protected static $defaultName = 'canal:client';

//    public function __construct()
//    {
//        parent::__construct();
//    }

    protected function configure()
    {
        $this->setDescription('canal 客户端');
        if (!empty($arguments)) {
            foreach ($arguments as $argument) {
                $this->addArgument($argument, InputArgument::REQUIRED);
            }
        }

        if (!empty($options)) {
            foreach ($options as $option) {
                $this->addArgument($option, InputArgument::OPTIONAL);
            }
        }
    }

    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $config = \Framework\Foundation\Ini::get('canal');
        try {
            $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SOCKET_CLUE);
            # $client = CanalConnectorFactory::createClient(CanalClient::TYPE_SWOOLE);

            $client->connect($config['host'], $config['port']);
            $client->checkValid();
//            $client->subscribe("1003", "wukong", '*\..*');
//            $client->subscribe("1003", "wukong", 'contra_test_rw\.z_canal_test,eas_test\.classes,eas_test\.class_teachers,basedata_test_db\.employee,basedata_test_db\.teacher,contra_test_rw\.contract,user_center_test\.parent,user_center_test\.student');
            $client->subscribe($config['client_id'], $config['destination'], $config['filter']);
            echo 'connect success', PHP_EOL;

            while (true) {
                $message = $client->get(100);
//                var_dump($message->getMemberId());
                if ($entries = $message->getEntries()) {
                    /**
                     * @var $entry \Com\Alibaba\Otter\Canal\Protocol\Entry;
                     */
                    foreach ($entries as $entry) {
                        Fmt::println($entry);
//                        echo '============', PHP_EOL;
                        $info = $this->extractElementInfo($entry);
                        if (!empty($info)) {
                            FDB::insertInto('ddd_event_stream')->values($info)->execute();
                        }
                    }
                }
                sleep(1);
            }

            $client->disConnect();
        } catch (\Exception $e) {
            echo $e->getMessage(), PHP_EOL;
        }
        echo '<pre>';
        print_r('done');
        exit('</pre>');
    }

    private function ptColumn($columns)
    {
        /** @var $column \Com\Alibaba\Otter\Canal\Protocol\Column; */
        foreach ($columns as $column) {
            echo sprintf(
                "%s : %s  update= %s",
                $column->getName(),
                $column->getValue(),
                var_export($column->getUpdated(), true)
            ), PHP_EOL;
        }
    }

    public function extractElementInfo($entry)
    {
        $entryType = $entry->getEntryType();
        switch ($entryType) {
            case EntryType::TRANSACTIONBEGIN:
                echo 'transaction begin', PHP_EOL;
                $this->transactionTag = uniqid() . '-' . time();
                echo $this->transactionTag, PHP_EOL;
                return null;
            case EntryType::TRANSACTIONEND:
                echo 'transaction end', PHP_EOL;
                $this->transactionTag = '';
                return null;
        }
        $infoData = [];
        $infoHeader['db_name'] = $entry->getHeader()->getSchemaName();
        $infoHeader['table_name'] = $entry->getHeader()->getTableName();
//        $infoHeader['header'] = $entry->getHeader();
        $infoHeader['transaction_tag'] = $this->transactionTag;

        $rowChange = new RowChange();
        $rowChange->mergeFromString($entry->getStoreValue());
        $eventType = $rowChange->getEventType();

        /**
         * @var $rowData \Com\Alibaba\Otter\Canal\Protocol\RowData;
         */
        foreach ($rowChange->getRowDatas() as $rowData) {
            $info = [];
            $columnNames = [];
            $updateColumnNames = [];
            $updateData = [];
            switch ($eventType) {
                case EventType::INSERT:
                    $columns = $rowData->getAfterColumns();
                    foreach ($columns as $column) {
                        $columnNames[] = $column->getName();
                        $updateColumnNames[] = $column->getName();
                        $updateData[$column->getName()] = $column->getValue();
                    }

                    $info['event_type'] = $eventType;
                    $info['columns'] = implode(',', $columnNames);
                    $info['update_columns'] = implode(',', $updateColumnNames);
                    $info['update_data'] = $updateData;
                    break;
                case EventType::UPDATE:

                    $columns = $rowData->getAfterColumns();
                    foreach ($columns as $column) {
                        $columnNames[] = $column->getName();
                        if ($column->getUpdated()) {
                            $updateColumnNames[] = $column->getName();
                            $updateData[$column->getName()] = $column->getValue();
                        }
                    }

                    $skipCheck = array_diff($updateColumnNames, ['sys_update_dc', 'updated_at']);
                    if (!empty($skipCheck)) {
                        $info['event_type'] = $entryType;
                        $info['columns'] = implode(',', $columnNames);
                        $info['update_columns'] = implode(',', $updateColumnNames);
                        $info['update_data'] = $updateData;
                    }
//                    self::ptColumn($rowData->getBeforeColumns());
                    break;
                case EventType::DELETE:  // 标识为删除
                    $columns = $rowData->getAfterColumns();
                    foreach ($columns as $column) {
                        $columnNames[] = $column->getName();
                        $updateColumnNames[] = $column->getName();
                        $updateData[$column->getName()] = $column->getValue();
                    }

                    $info['event_type'] = $entryType;
                    $info['columns'] = implode(',', $columnNames);
                    $info['update_columns'] = implode(',', $updateColumnNames);
                    $info['update_data'] = $updateData;
                    break;
                default:
                    echo '-------> 不关心的事件 <-------', PHP_EOL;
                    break;
            }

            if (!empty($info)) {
                $info['update_value'] = json_encode($info['update_data'], JSON_UNESCAPED_UNICODE);
                unset($info['update_data']);
                $infoData[] = array_merge($infoHeader, $info);
            }
        }

        return $infoData;
    }
}
